[FEATURE] Support multiple LLM connectos with proper rate limit#285
[FEATURE] Support multiple LLM connectos with proper rate limit#285fen-qin wants to merge 6 commits into
Conversation
e198c3c to
099c74b
Compare
martin-gaievski
left a comment
There was a problem hiding this comment.
This is good addition to the system, thanks for this contribution.
Few high level comments on top of what I posted in the code:
- can we create a single class for LLM params, e.g LLMConfiguration, where we keep things like rateLimit, maxRetries, retryBackoff, maxTokens etc. Currently we do have all those params spread all over the code in different places. Separate class should help to keep logical structure and decouple logic from parameters.
- we need to collect metric on retries, I think good way to do it is to use stats API that is part of the SRW repo. Check this PR where it has been introduced #63. We can collect number of retry attempts and delay
- In future we need to think of a separate pool for retries. With current implementation system uses
CompletableFuture.delayedExecutorwhich is basically common ForkJoinPool. Under high load with many retries this could exhaust the thread pool. This may be too much for this PR, more like a follow up change
| ignoreFailure | ||
| ); | ||
| List<String> unprocessedDocIds = docIds; | ||
| // List<String> unprocessedDocIds = deduplicateFromCache( |
There was a problem hiding this comment.
this is not needed in the final version
There was a problem hiding this comment.
yes. sure. this should be removed
|
|
||
| private boolean isRetryableError(Exception e) { | ||
| String message = e.getMessage(); | ||
| if (message == null) return true; |
There was a problem hiding this comment.
why it's retryable if without message? also please fix style, use curly braces
| import org.apache.logging.log4j.LogManager; | ||
| import org.apache.logging.log4j.Logger; | ||
|
|
||
| public class BedrockRateLimiter { |
There was a problem hiding this comment.
not everyone will use Bedrock as a platform. We need RateLimiter interface and BedrockRateLimiter will be one possible implementation of it.
There was a problem hiding this comment.
According to the high-level suggestions + this thread.
Yes, we should put a more general class MLConfiguration that can probably hold all common functions:
- rateLimit, maxRetries, retryBackoff, maxTokens etc
- prompt formatter
- response processor
to make it more clear if anyone what to add a new model with customized input/output interface and setting.
| ActionListener<String> listener | ||
| ) { | ||
| // Apply rate limiting per chunk to handle multiple chunks per query | ||
| BedrockRateLimiter.applyRateLimit(connectorType, customRateLimit); |
There was a problem hiding this comment.
We need to create exact implementation of RateLimiter interface using some sort of a Factory, or factory method, probably makes sense to have it as class variable. Then call only interface methods.
| this.rateLimit = rateLimitObj != null ? rateLimitObj : 0L; | ||
| } | ||
|
|
||
| public String getQuerySetId() { |
There was a problem hiding this comment.
for bunch of getters you can use lombok getter
| } | ||
| } | ||
|
|
||
| lastRequestTimes.put(key, System.currentTimeMillis()); |
There was a problem hiding this comment.
map can grow indefinitely, I cannot see where we delete from it
| long sleepTime = delayMs - timeSinceLastRequest; | ||
| try { | ||
| log.debug("Rate limiting {}: sleeping for {} ms", modelType, sleepTime); | ||
| Thread.sleep(sleepTime); |
There was a problem hiding this comment.
this is not inefficient to block threads with sleep. You can do multiple options, for example:
- Use ScheduledExecutorService, each time you need to sleep the thread, you do
scheduler.schedule(() -> { future.complete(null); }, waitTime, TimeUnit.MILLISECONDS)
or even simpler approach - CompletableFuture with delay
CompletableFuture.delayedExecutor(delayMs, TimeUnit.MILLISECONDS) .execute(() -> {});
|
Thanks @martin-gaievski for the review. This PR is about to do code refactoring on existing LLM processor, to make it supports models in a more general way:
and the interface should be extendable to onboard any other models |
Thanks for cc me. It does look like to be a lot of code conflicts with PR #264 We should think about how to merge it and the efforts to merge and resolve conflicts without having functionality break. |
From the code comparison of Overlap between two PRs, it is not trivial work to merge and ensure not functionality breaks. |
Signed-off-by: Fen Qin <mfenqin@amazon.com>
Signed-off-by: Fen Qin <mfenqin@amazon.com>
Signed-off-by: Fen Qin <mfenqin@amazon.com>
Signed-off-by: Fen Qin <mfenqin@amazon.com>
Signed-off-by: Fen Qin <mfenqin@amazon.com>
099c74b to
9deee6f
Compare
Signed-off-by: Fen Qin <mfenqin@amazon.com>
martin-gaievski
left a comment
There was a problem hiding this comment.
This is very good contribution, thank you for taking care of details and designing with a good level of abstractions. Overall code looks good to time, please address few minor comments and I'll approve PR.
| /** | ||
| * Interface for LLM connector implementations that handle different LLM providers | ||
| */ | ||
| public interface LLMConnector { |
There was a problem hiding this comment.
this is really good abstraction, thank you for allocating time and addressing this
There was a problem hiding this comment.
What do you think about adding one more method here, supportsJsonMode()? Claude/Cohere don't support json response natively, so by chance you may receive response that should be parsed differently
| /** | ||
| * Adaptive rate limiter that learns optimal rates per model and handles circuit breaking | ||
| */ | ||
| public class AdaptiveRateLimiter { |
There was a problem hiding this comment.
can you please add abstraction layer and extract interface for rate limiter, similar to what you have done for LLMConnector. I suggest humble interface name RateLimiter
| return CompletableFuture.failedFuture(new RuntimeException("Model appears to be unavailable: " + modelId)); | ||
| } | ||
|
|
||
| long delayMs = state.calculateDelay(); |
There was a problem hiding this comment.
we need a metric for retry event, something like llm_rate_limit_retries with existing stat API, it's a counter that incremented on every retry.
There was a problem hiding this comment.
For the metric, I would like to know the final usage of it. How the metrics would help the user or administrator and can we achieve the same without using metrics?
There was a problem hiding this comment.
The metrics recommendation was for observability completeness, it's not essential for this PR. The rate limiter already:
- logs all throttling events (searchable in log aggregators)
- reports job success/failure counts in the response
- has a circuit breaker that stops requests on persistent failures
with such metric it's possible to build a monitoring system on a provider side, who may not have access to user data, but can detect spikes in retries. Later it should simplify troubleshooting for cases when lots of ratings are 0 because we retry and then failed, or experiment is running for a very long time (it's quite possible for some cloud providers that are giving low quotas that can lead to tens of minutes even on a simple experiment)
There was a problem hiding this comment.
Instead of using stats, can we store it as part of experiment result?
When rating is 0, we should mark it as failure with its root cause. For experiment running for long, cx should be able to query the experiment and check its current status if we store all those data somewhere along with experiment metadata.
There was a problem hiding this comment.
Do you mean process of generating ratings instead of experiment?
Users are able to use LLM for generating ratings in standalone mode, without any experiments. For example, my humble typical use case for SRW is - import query terms from external system, generate judgment ratings with LLM, run experiment. If I already have ratings I will not generate them again at all.
| boolean overwriteCache = Optional.ofNullable((Boolean) source.get(OVERWRITE_CACHE)).orElse(Boolean.FALSE); | ||
|
|
||
| // Parse connectorType - optional, defaults to OpenAI | ||
| ConnectorType connectorType = ConnectorType.OPENAI; // default |
There was a problem hiding this comment.
This should be null. It cannot have default as OPENAI until we also load a OPENAI modelId if no other connector is provided.
| PutJudgmentRequest createRequest; | ||
| switch (type) { | ||
| case LLM_JUDGMENT -> { | ||
| String modelId = (String) source.get(MODEL_ID); |
There was a problem hiding this comment.
It is high time we should break this into smaller methods. For that matter we can create a class out of it of what all is going in under LLM judgement.
Class name can be LlmjudgementType and UbiJudgementType
| log.debug("DEBUG: predictSingleChunk called with modelId: {}", modelId); | ||
| RemoteInferenceInputDataSet dataset = (RemoteInferenceInputDataSet) mlInput.getInputDataset(); | ||
| Map<String, String> params = dataset.getParameters(); | ||
| public void predictSingleChunk( |
There was a problem hiding this comment.
Add javadoc to all public methods and clearly articulate the purpose of them.
| @@ -105,58 +111,76 @@ | |||
| private void generateJudgmentRatingInternal(Map<String, Object> metadata, ActionListener<List<Map<String, Object>>> listener) { | |||
There was a problem hiding this comment.
In my opinion, we have excessive logging in this class. Almost at each small piece of code execution. We should reduce this.
| boolean overwriteCache | ||
| ) { | ||
| private Map<String, Object> processQueryTextAsync(LlmJudgmentContext context, String queryTextWithCustomInput) { | ||
| log.info("Processing query text judgment: {}", queryTextWithCustomInput); |
There was a problem hiding this comment.
I am assuming if we have 1000 query terms then for each query term this line will be executed.
Processing query text judgment: {}" x 1000 times
| return docIds; | ||
| } | ||
| List<String> processedDocIds = Collections.synchronizedList(new ArrayList<>()); | ||
| AtomicBoolean hasFailure = new AtomicBoolean(false); |
There was a problem hiding this comment.
Where is this hasFailure variable is used?
Description
This PR introduces comprehensive multi-provider LLM support to the OpenSearch Search Relevance plugin, enabling users to work with OpenAI, Claude, Cohere, and DeepSeek models through a unified interface. Additionally, it implements adaptive rate limiting to prevent API throttling and improve system reliability.
Core Features
1. Multi-Provider LLM Connector Framework
Transitioned from OpenAI-only to a flexible connector pattern supporting multiple LLM providers.
Key Components:
2. REST API Integration
Added optional connectorType and rateLimit parameter to LLM judgment requests.
API Usage:
{ "type": "LLM_JUDGMENT", "modelId": "anthropic.claude-3-5-haiku", "connectorType": "claude", "rateLimit": 1000, "querySetId": "test-queries" }3. Adaptive Rate Limiting System
Prevents API rate limit errors through adaptive learning and circuit breaking.
Key Features:
• Per-Model State: Separate rate limiting for each model/connector combination
• Adaptive Learning: Automatically adjusts delays based on success/failure patterns
• Circuit Breaker: Temporarily stops requests after consecutive failures (threshold: 10)
• Conservative Recovery: Gradual rate reduction (0.9x) after sustained success
• Resource Management: Proper thread pool lifecycle with cleanup
Rate Limiting Logic:
E2E Testing
/docs/llm-model, validated for 4 ConnectorType: OpenAI, Claude, DeepSeek and CohereIssues Resolved
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.